{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Training Roboschool agents using distributed RL training across multiple nodes with Amazon SageMaker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook is an extension of `rl_roboschool_ray.ipynb` showcasing horizontal scaling of Reinforcement learning using Ray and TensorFlow."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pick which Roboschool problem to solve\n",
    "\n",
    "Roboschool is an [open source](https://github.com/openai/roboschool/tree/master/roboschool) physics simulator that is commonly used to train RL policies for robotic systems.  Roboschool defines a [variety](https://github.com/openai/roboschool/blob/master/roboschool/__init__.py) of Gym environments that correspond to different robotics problems.  Here we're highlighting a few of them at varying levels of difficulty:\n",
    "\n",
    "- **Reacher (easy)** - a very simple robot with just 2 joints reaches for a target\n",
    "- **Hopper (medium)** - a simple robot with one leg and a foot learns to hop down a track \n",
    "- **Humanoid (difficult)** - a complex 3D robot with two arms, two legs, etc. learns to balance without falling over and then to run on a track\n",
    "\n",
    "The simpler problems train faster with less computational resources.  The more complex problems are more fun."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Uncomment the problem to work on\n",
    "roboschool_problem = 'reacher'\n",
    "#roboschool_problem = 'hopper'\n",
    "#roboschool_problem = 'humanoid'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pre-requisites \n",
    "\n",
    "### Imports\n",
    "\n",
    "To get started, we'll import the Python libraries we need, set up the environment with a few prerequisites for permissions and configurations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "import sys\n",
    "import os\n",
    "import glob\n",
    "import re\n",
    "import subprocess\n",
    "from IPython.display import HTML, Markdown\n",
    "import time\n",
    "from time import gmtime, strftime\n",
    "sys.path.append(\"common\")\n",
    "from misc import get_execution_role, wait_for_s3_object\n",
    "from docker_utils import build_and_push_docker_image\n",
    "from sagemaker.rl import RLEstimator, RLToolkit, RLFramework\n",
    "from markdown_helper import generate_help_for_s3_endpoint_permissions, create_s3_endpoint_manually"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup S3 bucket\n",
    "\n",
    "Set up the linkage and authentication to the S3 bucket that you want to use for checkpoint and the metadata. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sage_session = sagemaker.session.Session()\n",
    "s3_bucket = sage_session.default_bucket()  \n",
    "s3_output_path = 's3://{}/'.format(s3_bucket)\n",
    "print(\"S3 bucket path: {}\".format(s3_output_path))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Define Variables \n",
    "\n",
    "We define variables such as the job prefix for the training jobs *and the image path for the container (only when this is BYOC).*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create a descriptive job name \n",
    "job_name_prefix = 'rl-roboschool-distributed-' + roboschool_problem\n",
    "aws_region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Configure where training happens\n",
    "\n",
    "You can train your RL training jobs using the SageMaker notebook instance or local notebook instance. In both of these scenarios, you can run the following in either local or SageMaker modes. The local mode uses the SageMaker Python SDK to run your code in a local container before deploying to SageMaker. This can speed up iterative testing and debugging while using the same familiar Python SDK interface. You just need to set `local_mode = True`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# run in local_mode on this machine, or as a SageMaker TrainingJob?\n",
    "local_mode = False\n",
    "\n",
    "if local_mode:\n",
    "    instance_type = 'local'\n",
    "else:\n",
    "    # If on SageMaker, pick the instance type\n",
    "    instance_type = \"ml.c5.2xlarge\"\n",
    "    \n",
    "train_instance_count = 3"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create an IAM role\n",
    "\n",
    "Either get the execution role when running from a SageMaker notebook instance `role = sagemaker.get_execution_role()` or, when running from local notebook instance, use utils method `role = get_execution_role()` to create an execution role."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    role = sagemaker.get_execution_role()\n",
    "except:\n",
    "    role = get_execution_role()\n",
    "\n",
    "print(\"Using IAM role arn: {}\".format(role))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Install docker for `local` mode\n",
    "\n",
    "In order to work in `local` mode, you need to have docker installed. When running from you local machine, please make sure that you have docker and docker-compose (for local CPU machines) and nvidia-docker (for local GPU machines) installed. Alternatively, when running from a SageMaker notebook instance, you can simply run the following script to install dependenceis.\n",
    "\n",
    "Note, you can only run a single local notebook at one time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# only run from SageMaker notebook instance\n",
    "if local_mode:\n",
    "    !/bin/bash ./common/setup.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build docker container\n",
    "\n",
    "We must build a custom docker container with Roboschool installed.  This takes care of everything:\n",
    "\n",
    "1. Fetching base container image\n",
    "2. Installing Roboschool and its dependencies\n",
    "3. Uploading the new container image to ECR\n",
    "\n",
    "This step can take a long time if you are running on a machine with a slow internet connection.  If your notebook instance is in SageMaker or EC2 it should take 3-10 minutes depending on the instance type.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "cpu_or_gpu = 'gpu' if instance_type.startswith('ml.p') else 'cpu'\n",
    "repository_short_name = \"sagemaker-roboschool-ray-%s\" % cpu_or_gpu\n",
    "docker_build_args = {\n",
    "    'CPU_OR_GPU': cpu_or_gpu, \n",
    "    'AWS_REGION': boto3.Session().region_name,\n",
    "}\n",
    "custom_image_name = build_and_push_docker_image(repository_short_name, build_args=docker_build_args)\n",
    "print(\"Using ECR image %s\" % custom_image_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Write the Training Code\n",
    "\n",
    "The training code is in a series of \n",
    "The training code is written in the file “train-{roboschool_problem}.py” which is uploaded in the /src directory. \n",
    "First import the environment files and the preset files, and then define the main() function. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pygmentize src/train-{roboschool_problem}.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Ray homogeneous scaling - Specify train_instance_count > 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Homogeneous scaling allows us to use multiple instances of the same type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY)\n",
    "    \n",
    "estimator = RLEstimator(entry_point=\"train-%s.py\" % roboschool_problem,\n",
    "                        source_dir='src',\n",
    "                        dependencies=[\"common/sagemaker_rl\"],\n",
    "                        image_name=custom_image_name,\n",
    "                        role=role,\n",
    "                        train_instance_type=instance_type,\n",
    "                        train_instance_count=train_instance_count,\n",
    "                        output_path=s3_output_path,\n",
    "                        base_job_name=job_name_prefix,\n",
    "                        metric_definitions=metric_definitions,\n",
    "                        hyperparameters={\n",
    "                          # Attention scientists!  You can override any Ray algorithm parameter here:\n",
    "                            \n",
    "                            # 3 m4.2xl with 8 cores each. We have to leave 1 core for ray scheduler.\n",
    "                            # Don't forget to change this on the basis of instance type.\n",
    "                            \"rl.training.config.num_workers\": (8 * train_instance_count) - 1\n",
    "                            \n",
    "                          #\"rl.training.config.horizon\": 5000,\n",
    "                          #\"rl.training.config.num_sgd_iter\": 10,\n",
    "                        }\n",
    "                    )\n",
    "\n",
    "estimator.fit(wait=local_mode)\n",
    "job_name = estimator.latest_training_job.job_name\n",
    "print(\"Training job: %s\" % job_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Visualization\n",
    "\n",
    "RL training can take a long time.  So while it's running there are a variety of ways we can track progress of the running training job.  Some intermediate output gets saved to S3 during training, so we'll set up to capture that."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Job name: {}\".format(job_name))\n",
    "\n",
    "s3_url = \"s3://{}/{}\".format(s3_bucket,job_name)\n",
    "\n",
    "if local_mode:\n",
    "    output_tar_key = \"{}/output.tar.gz\".format(job_name)\n",
    "else:\n",
    "    output_tar_key = \"{}/output/output.tar.gz\".format(job_name)\n",
    "\n",
    "intermediate_folder_key = \"{}/output/intermediate/\".format(job_name)\n",
    "output_url = \"s3://{}/{}\".format(s3_bucket, output_tar_key)\n",
    "intermediate_url = \"s3://{}/{}\".format(s3_bucket, intermediate_folder_key)\n",
    "\n",
    "print(\"S3 job path: {}\".format(s3_url))\n",
    "print(\"Output.tar.gz location: {}\".format(output_url))\n",
    "print(\"Intermediate folder path: {}\".format(intermediate_url))\n",
    "    \n",
    "tmp_dir = \"/tmp/{}\".format(job_name)\n",
    "os.system(\"mkdir {}\".format(tmp_dir))\n",
    "print(\"Create local folder {}\".format(tmp_dir))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fetch videos of training rollouts\n",
    "Videos of certain rollouts get written to S3 during training.  Here we fetch the last 10 videos from S3, and render the last one."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "recent_videos = wait_for_s3_object(s3_bucket, intermediate_folder_key, tmp_dir, \n",
    "                                fetch_only=(lambda obj: obj.key.endswith(\".mp4\") and obj.size>0), limit=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "last_video = sorted(recent_videos)[-1]  # Pick which video to watch\n",
    "os.system(\"mkdir -p ./src/tmp_render/ && cp {} ./src/tmp_render/last_video.mp4\".format(last_video))\n",
    "HTML('<video src=\"./src/tmp_render/last_video.mp4\" controls autoplay></video>')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Plot metrics for training job\n",
    "We can see the reward metric of the training as it's running, using algorithm metrics that are recorded in CloudWatch metrics.  We can plot this to see the performance of the model over time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "from sagemaker.analytics import TrainingJobAnalytics\n",
    "\n",
    "df = TrainingJobAnalytics(job_name, ['episode_reward_mean']).dataframe()\n",
    "num_metrics = len(df)\n",
    "if num_metrics == 0:\n",
    "    print(\"No algorithm metrics found in CloudWatch\")\n",
    "else:\n",
    "    plt = df.plot(x='timestamp', y='value', figsize=(12,5), legend=True, style='b-')\n",
    "    plt.set_ylabel('Mean reward per episode')\n",
    "    plt.set_xlabel('Training time (s)')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Monitor training progress\n",
    "You can repeatedly run the visualization cells to get the latest videos or see the latest metrics as the training job proceeds."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Ray heterogeneous scaling"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To scale out RL training, we can increase the number of rollout workers. However, with more rollouts, training can often become the bottleneck. To prevent this, we can use an instance with one or more GPUs for training, and multiple CPU instances for rollouts."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since SageMaker supports a single type of instance in a training job, we can achieve the above by spinning two SageMaker jobs and letting them communicate with each other. For the sake of naming, we'll use `Primary cluster` to refer to 1 or more GPU instances, and `Secondary cluster` to refer to the cluster of CPU instances."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> Please note that local_mode cannot be used for testing this type of scaling."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Before we configure the SageMaker job, let us first ensure that we run SageMaker in VPC mode. VPC mode will allow the two SageMaker jobs to communicate over network."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This can be done by supplying subnets and security groups to the job launching scripts. We will use the default VPC configuration for this example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ec2 = boto3.client('ec2')\n",
    "default_vpc = [vpc['VpcId'] for vpc in ec2.describe_vpcs()['Vpcs'] if vpc[\"IsDefault\"] == True][0]\n",
    "\n",
    "default_security_groups = [group[\"GroupId\"] for group in ec2.describe_security_groups()['SecurityGroups'] \\\n",
    "                   if group[\"GroupName\"] == \"default\" and group[\"VpcId\"] == default_vpc]\n",
    "\n",
    "default_subnets = [subnet[\"SubnetId\"] for subnet in ec2.describe_subnets()[\"Subnets\"] \\\n",
    "                  if subnet[\"VpcId\"] == default_vpc and subnet['DefaultForAz']==True]\n",
    "\n",
    "print(\"Using default VPC:\", default_vpc)\n",
    "print(\"Using default security group:\", default_security_groups)\n",
    "print(\"Using default subnets:\", default_subnets)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A SageMaker job running in VPC mode cannot access S3 resources. So, we need to create a VPC S3 endpoint to allow S3 access from SageMaker container. To learn more about the VPC mode, please visit [this link.](https://docs.aws.amazon.com/sagemaker/latest/dg/train-vpc.html)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    route_tables = [route_table[\"RouteTableId\"] for route_table in ec2.describe_route_tables()['RouteTables']\\\n",
    "                if route_table['VpcId'] == default_vpc]\n",
    "except Exception as e:\n",
    "    if \"UnauthorizedOperation\" in str(e):\n",
    "        display(Markdown(generate_help_for_s3_endpoint_permissions(role)))\n",
    "    else:\n",
    "        display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))\n",
    "    raise e\n",
    "\n",
    "print(\"Trying to attach S3 endpoints to the following route tables:\", route_tables)\n",
    "\n",
    "assert len(route_tables) >= 1, \"No route tables were found. Please follow the VPC S3 endpoint creation \"\\\n",
    "                              \"guide by clicking the above link.\"\n",
    "\n",
    "try:\n",
    "    ec2.create_vpc_endpoint(DryRun=False,\n",
    "                           VpcEndpointType=\"Gateway\",\n",
    "                           VpcId=default_vpc,\n",
    "                           ServiceName=\"com.amazonaws.{}.s3\".format(aws_region),\n",
    "                           RouteTableIds=route_tables)\n",
    "    print(\"S3 endpoint created successfully!\")\n",
    "except Exception as e:\n",
    "    if \"RouteAlreadyExists\" in str(e):\n",
    "        print(\"S3 endpoint already exists.\")\n",
    "    elif \"UnauthorizedOperation\" in str(e):\n",
    "        display(Markdown(generate_help_for_s3_endpoint_permissions(role)))\n",
    "        raise e\n",
    "    else:\n",
    "        display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))\n",
    "        raise e"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Configure instance types"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us configure a cluster with 1 Volta (V100) GPU and 40 CPU cores. We can do this by using 1 ml.p3.2xlarge instance and 2 ml.c5.4xlarge instances, since ml.p3.2xlarge has 8 CPU cores and ml.c5.4xlarge has 16 CPU cores."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "# Build CPU image\n",
    "cpu_repository_short_name = \"sagemaker-roboschool-ray-%s\" % \"cpu\"\n",
    "docker_build_args = {\n",
    "    'CPU_OR_GPU': \"cpu\", \n",
    "    'AWS_REGION': boto3.Session().region_name,\n",
    "}\n",
    "cpu_image_name = build_and_push_docker_image(repository_short_name, build_args=docker_build_args)\n",
    "print(\"Using CPU ECR image %s\" % cpu_image_name)\n",
    "\n",
    "# Build GPU image\n",
    "gpu_repository_short_name = \"sagemaker-roboschool-ray-%s\" % \"gpu\"\n",
    "docker_build_args = {\n",
    "    'CPU_OR_GPU': \"gpu\", \n",
    "    'AWS_REGION': boto3.Session().region_name,\n",
    "}\n",
    "gpu_image_name = build_and_push_docker_image(repository_short_name, build_args=docker_build_args)\n",
    "print(\"Using GPU ECR image %s\" % gpu_image_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "primary_cluster_instance_type = \"ml.p3.2xlarge\"\n",
    "primary_cluster_instance_count = 1\n",
    "\n",
    "secondary_cluster_instance_type = \"ml.c5.4xlarge\"\n",
    "secondary_cluster_instance_count = 2\n",
    "\n",
    "total_cpus = 40 - 1 # Leave one for ray scheduler\n",
    "total_gpus = 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we choose the roboschool agent that we want to train. For heterogeneous training, we also pass some additional parameters to the training job that aid in synchronization across instances:\n",
    "- s3_bucket, s3_prefix: Used for storing metadata like master IP address\n",
    "- rl_cluster_type: \"primary\" or \"secondary\"\n",
    "- aws_region: This is required for making connection to S3 in VPC mode\n",
    "- rl_num_instances_secondary: Number of nodes in secondary cluster\n",
    "- subnets, security_group_ids: Required by VPC mode"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "roboschool_problem = 'reacher'\n",
    "job_name_prefix = 'rl-roboschool-distributed-'+ roboschool_problem\n",
    "\n",
    "s3_output_path = 's3://{}/'.format(s3_bucket) # SDK appends the job name and output folder\n",
    "\n",
    "# We explicitly need to specify these params so that the two jobs can synchronize using the metadata stored here\n",
    "s3_bucket = sage_session.default_bucket()\n",
    "s3_prefix = \"dist-ray-%s-1GPU-40CPUs\" % (roboschool_problem)\n",
    "\n",
    "# Make sure that the prefix is empty\n",
    "!aws s3 rm --recursive s3://{s3_bucket}/{s3_prefix}    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Launch primary cluster (1 GPU training instance)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "primary_cluster_estimator = RLEstimator(entry_point=\"train-%s.py\" % roboschool_problem,\n",
    "                            source_dir='src',\n",
    "                            dependencies=[\"common/sagemaker_rl\"],\n",
    "                            image_name=gpu_image_name,\n",
    "                            role=role,\n",
    "                            train_instance_type=primary_cluster_instance_type,\n",
    "                            train_instance_count=primary_cluster_instance_count,\n",
    "                            output_path=s3_output_path,\n",
    "                            base_job_name=job_name_prefix,\n",
    "                            metric_definitions=metric_definitions,\n",
    "                            train_max_run=int(3600 * .5), # Maximum runtime in seconds\n",
    "                            hyperparameters={\n",
    "                                \"s3_prefix\": s3_prefix, # Important for syncing\n",
    "                                \"s3_bucket\": s3_bucket, # Important for syncing\n",
    "                                \"aws_region\": boto3.Session().region_name, # Important for S3 connection\n",
    "                                \"rl_cluster_type\": \"primary\", # Important for syncing\n",
    "                                \"rl_num_instances_secondary\": secondary_cluster_instance_count, # Important for syncing\n",
    "                                \"rl.training.config.num_workers\": total_cpus,\n",
    "                                \"rl.training.config.train_batch_size\": 20000,\n",
    "                                \"rl.training.config.num_gpus\": total_gpus,\n",
    "                            },\n",
    "                            subnets=default_subnets, # Required for VPC mode\n",
    "                            security_group_ids=default_security_groups # Required for VPC mode\n",
    "                        )\n",
    "\n",
    "primary_cluster_estimator.fit(wait=False)\n",
    "primary_job_name = primary_cluster_estimator.latest_training_job.job_name\n",
    "print(\"Primary Training job: %s\" % primary_job_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Launch secondary cluster (2 CPU instances)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "secondary_cluster_estimator = RLEstimator(entry_point=\"train-%s.py\" % roboschool_problem,\n",
    "                            source_dir='src',\n",
    "                            dependencies=[\"common/sagemaker_rl\"],\n",
    "                            image_name=cpu_image_name,\n",
    "                            role=role,\n",
    "                            train_instance_type=secondary_cluster_instance_type,\n",
    "                            train_instance_count=secondary_cluster_instance_count,\n",
    "                            output_path=s3_output_path,\n",
    "                            base_job_name=job_name_prefix,\n",
    "                            metric_definitions=metric_definitions,\n",
    "                            train_max_run=3600, # Maximum runtime in seconds\n",
    "                            hyperparameters={\n",
    "                                \"s3_prefix\": s3_prefix, # Important for syncing\n",
    "                                \"s3_bucket\": s3_bucket, # Important for syncing\n",
    "                                \"aws_region\": boto3.Session().region_name, # Important for S3 connection\n",
    "                                \"rl_cluster_type\": \"secondary\", # Important for syncing\n",
    "                            },\n",
    "                            subnets=default_subnets, # Required for VPC mode\n",
    "                            security_group_ids=default_security_groups # Required for VPC mode\n",
    "                        )\n",
    "\n",
    "secondary_cluster_estimator.fit(wait=False)\n",
    "secondary_job_name = secondary_cluster_estimator.latest_training_job.job_name\n",
    "print(\"Secondary Training job: %s\" % secondary_job_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Visualization"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Job name: {}\".format(primary_job_name))\n",
    "\n",
    "s3_url = \"s3://{}/{}\".format(s3_bucket,primary_job_name)\n",
    "\n",
    "if local_mode:\n",
    "    output_tar_key = \"{}/output.tar.gz\".format(primary_job_name)\n",
    "else:\n",
    "    output_tar_key = \"{}/output/output.tar.gz\".format(primary_job_name)\n",
    "\n",
    "intermediate_folder_key = \"{}/output/intermediate/\".format(primary_job_name)\n",
    "output_url = \"s3://{}/{}\".format(s3_bucket, output_tar_key)\n",
    "intermediate_url = \"s3://{}/{}\".format(s3_bucket, intermediate_folder_key)\n",
    "\n",
    "print(\"S3 job path: {}\".format(s3_url))\n",
    "print(\"Output.tar.gz location: {}\".format(output_url))\n",
    "print(\"Intermediate folder path: {}\".format(intermediate_url))\n",
    "    \n",
    "tmp_dir = \"/tmp/{}\".format(primary_job_name)\n",
    "os.system(\"mkdir {}\".format(tmp_dir))\n",
    "print(\"Create local folder {}\".format(tmp_dir))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fetch videos of training rollouts\n",
    "Videos of certain rollouts get written to S3 during training.  Here we fetch the last 10 videos from S3, and render the last one."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "recent_videos = wait_for_s3_object(s3_bucket, intermediate_folder_key, tmp_dir, \n",
    "                                fetch_only=(lambda obj: obj.key.endswith(\".mp4\") and obj.size>0), limit=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "last_video = sorted(recent_videos)[-1]  # Pick which video to watch\n",
    "os.system(\"mkdir -p ./src/tmp_render/ && cp {} ./src/tmp_render/last_video.mp4\".format(last_video))\n",
    "HTML('<video src=\"./src/tmp_render/last_video.mp4\" controls autoplay></video>')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Plot metrics for training job\n",
    "We can see the reward metric of the training as it's running, using algorithm metrics that are recorded in CloudWatch metrics.  We can plot this to see the performance of the model over time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "from sagemaker.analytics import TrainingJobAnalytics\n",
    "\n",
    "df = TrainingJobAnalytics(primary_job_name, ['episode_reward_mean']).dataframe()\n",
    "num_metrics = len(df)\n",
    "if num_metrics == 0:\n",
    "    print(\"No algorithm metrics found in CloudWatch\")\n",
    "else:\n",
    "    plt = df.plot(x='timestamp', y='value', figsize=(12,5), legend=True, style='b-')\n",
    "    plt.set_ylabel('Mean reward per episode')\n",
    "    plt.set_xlabel('Training time (s)')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And that's it! You can repeatedly run the visualization cells to get the latest videos or see the latest metrics as the training job proceeds."
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  },
  "notice": "Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
